package com.galeno.sparksql02

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{Column, ColumnName, DataFrame, SparkSession}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import java.util.Properties

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/9/610:24
 */
object TestDSL风格 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("aa")
      .master("local")
      .config("spark.sql.crossJoin.enabled", "true")
      .getOrCreate()
    import spark.implicits._
    var schema = StructType(Seq(
      StructField("id", DataTypes.IntegerType),
      StructField("name", DataTypes.StringType),
      StructField("role", DataTypes.StringType),
      StructField("energy", DataTypes.DoubleType)
    ))
    val df = spark.read.option("header", "true").schema(schema).csv("data/battel2.txt")
    //df.show()
    //查看某些行
    //df.select("id", "name").show()
    //会解析函数运算符等表达式
    //df.selectExpr("id","name","role","energy+100").show()
    //使用列对象
    val id: Column = df("id") //id的列对象
    //用列对象,全程列对象,无法再使用列名
    //df.select(id,df("name")).show()
    //同列对象相同的功能,需要隐式转换
    val energy: ColumnName = $"energy"
    //df.select($"id",$"name",df("role"),energy).show()
    //单引号写
    val energy1: Symbol = 'energy
    //  df.select('name,'role,energy1).show()
    import org.apache.spark.sql.functions._
    //调用函数里的方法
    val name: Column = col("name")
    // df.select(col("id"),name,col("role")).show()

    //df.select(df("id"), 'name, col("role"), df("energy") + 10).show()


    /**
     * where操作
     */

    //    df.where("id>3").show()
    //    df.where($"id">3).show()
    //    df.where('id<3).show()

    /**
     * groupBy
     */

    // df.groupBy("role").sum("energy","id").show()//只有一种聚合算子
    //多种算子聚合使用的方式
    val df2: DataFrame = df.groupBy("role") agg(sum($"energy") as ("求和"), max('energy).as("最大值"), min(col("id") as ("最小值")))
    // df2.show()

    /**
     * order操作
     */
    // df.orderBy('energy.desc,'id.asc).show()

    /**
     * 窗口函数
     */
    val df3 = df.selectExpr("id", "name", "role", "energy", "row_number() over(partition by role order by energy desc) as rn ")
    df3.where("rn<=1").select("id", "name", "role", "energy").show()
    //下面的语句等同于上面的语句
    // df3.where("rn<=1").drop("rn").show()

    //写法2
    // df.select('id, 'name, 'role, 'energy, row_number().over(Window.partitionBy(col("role")).orderBy('energy desc)).as("rn")).where("rn<=1").drop("rn").show()


    //join操作
    val properties = new Properties()
    properties.load(this.getClass.getClassLoader.getResourceAsStream("mysql.properties"))
    val user = properties.getProperty("user")
    val password = properties.getProperty("password")
    val url = properties.getProperty("url")
    println(user + url + password)
    val dfSq = spark.read.jdbc(url, "battel", properties)
    //    dfSq.show()
    //    df.show()
    df.join(dfSq).show()
    //用colum对偶性表达join条件
    // df.join(dfSq,df("id")===dfSq("id")).drop(dfSq("work")).drop(dfSq("name")).drop(dfSq("id")).show()
    //usingColumn表达join条件,要求两个有一个名字相同
    //    df.join(dfSq,"id").show()
      //  df.join(dfSq,Seq("id"),"left_semi").show()
    /**
     *
     * Union操作   在这里Union等价于Union ALl  可以在后面追加distinct使其结果转变为Union
     *
     */

//    df.union(dfSq).show()
//    df.union(dfSq).distinct().show()




  }


}
